Solutions/ImpervaCloudWAF/Data Connectors/ImpervaWAFCloudSentinelConnector/__init__.py (221 lines of code) (raw):

import requests from requests.packages.urllib3.util.retry import Retry import urllib3 import os import zlib import json import azure.functions as func import base64 import hmac import hashlib import datetime import re import logging from .state_manager import StateManager customer_id = os.environ['WorkspaceID'] shared_key = os.environ['WorkspaceKey'] imperva_waf_api_id = os.environ['ImpervaAPIID'] imperva_waf_api_key = os.environ['ImpervaAPIKey'] imperva_waf_log_server_uri = os.environ['ImpervaLogServerURI'] logs_encryption_private_key = "" connection_string = os.environ['AzureWebJobsStorage'] logAnalyticsUri = os.environ.get('logAnalyticsUri') if ((logAnalyticsUri in (None, '') or str(logAnalyticsUri).isspace())): logAnalyticsUri = 'https://' + customer_id + '.ods.opinsights.azure.com' pattern = r"https:\/\/([\w\-]+)\.ods\.opinsights\.azure.([a-zA-Z\.]+)$" match = re.match(pattern,str(logAnalyticsUri)) if(not match): raise Exception("Invalid Log Analytics Uri.") class ImpervaFilesHandler: def __init__(self): self.url = imperva_waf_log_server_uri retries = Retry( total=3, status_forcelist={500, 429}, backoff_factor=1, respect_retry_after_header=True ) adapter = requests.adapters.HTTPAdapter(max_retries=retries) self.session = requests.Session() self.session.mount('https://', adapter) self.auth = urllib3.make_headers(basic_auth='{}:{}'.format(imperva_waf_api_id, imperva_waf_api_key)) self.files_array = self.list_index_file() self.sentinel = ProcessToSentinel() def list_index_file(self): files_array = [] try: r = self.session.get(url="{}/{}".format(self.url, f"logs.index"), headers= self.auth ) if 200 <= r.status_code <= 299: logging.info("Successfully downloaded index file.") for line in r.iter_lines(): files_array.append(line.decode('UTF-8')) return files_array elif r.status_code == 400: logging.error("Bad Request. The request was invalid or cannot be otherwise served." " Error code: {}".format(r.status_code)) elif r.status_code == 404: logging.error("Could not find index file. Response code is {}".format(r.status_code)) elif r.status_code == 401: logging.error("Authorization error - Failed to download index file. Response code is {}".format(r.status_code)) elif r.status_code == 429: logging.error("Rate limit exceeded - Failed to download index file. Response code is {}".format(r.status_code)) else: if r.status_code is None: logging.error("Something wrong. Error text: {}".format(r.text)) else: logging.error("Something wrong. Error code: {}".format(r.status_code)) except Exception as err: logging.error("Something wrong. Exception error text: {}".format(err)) def last_file_point(self): try: if self.files_array is not None: state = StateManager(connection_string=connection_string) past_file = state.get() if past_file is not None: logging.info("The last file point is: {}".format(past_file)) try: index = self.files_array.index(past_file) files_arr = self.files_array[index + 1:] except Exception as err: logging.info("Last point file detection error: {}. So Processing all the files from index file".format(err)) files_arr = self.files_array else: files_arr = self.files_array logging.info("There are {} files in the list index file.".format(len(files_arr))) if self.files_array is not None: current_file = self.files_array[-1] state.post(current_file) return files_arr except Exception as err: logging.error("Last point file detection error. Exception error text: {}".format(err)) def download_files(self): files_for_download = self.last_file_point() if files_for_download is not None: for file in files_for_download: logging.info("Downloading file {}".format(file)) self.download_file(file) def download_file(self, file_name): try: r = self.session.get(url="{}/{}".format(self.url, file_name), stream=True, headers=self.auth) if 200 <= r.status_code <= 299: logging.info("Successfully downloaded file: {}".format(file_name)) self.decrypt_and_unpack_file(file_name, r.content) return r.status_code elif r.status_code == 400: logging.error("Bad Request. The request was invalid or cannot be otherwise served." " Error code: {}".format(r.status_code)) elif r.status_code == 404: logging.error("Could not find file {}. Response code: {}".format(file_name, r.status_code)) elif r.status_code == 401: logging.error("Authorization error - Failed to download file {}. Response code: {}".format(file_name, r.status_code)) elif r.status_code == 429: logging.error("Rate limit exceeded - Failed to downloadfile {}. Response code: {}".format(file_name, r.status_code)) else: if r.status_code is None: logging.error("Something wrong. Error text: {}".format(r.text)) else: logging.error("Something wrong. Error code: {}".format(r.status_code)) except Exception as err: logging.error("Something wrong. Exception error text: {}".format(err)) def decrypt_and_unpack_file(self, file_name, file_content): logging.info("Unpacking and decrypting file {}".format(file_name)) file_splitted = file_content.split(b"|==|\n") file_header = file_splitted[0].decode("utf-8") file_data = file_splitted[1] file_encryption_flag = file_header.find("key:") events_arr = [] if file_encryption_flag == -1: try: events_data = zlib.decompressobj().decompress(file_data).decode("utf-8") except Exception as err: if 'while decompressing data: incorrect header check' in err.args[0]: events_data = file_data.decode("utf-8") else: logging.error("Error during decompressing and decoding the file with error message {}.".format(err)) if events_data is not None: for line in events_data.splitlines(): if "CEF" in line: event_message = self.parse_cef(line) events_arr.append(event_message) for chunk in self.gen_chunks_to_object(events_arr, chunksize=1000): self.sentinel.post_data(json.dumps(chunk), len(chunk), file_name) def parse_cef(self,cef_raw): rx = r'([^=\s\|]+)?=((?:[\\]=|[^=])+)(?:\s|$)' parsed_cef = {"EventVendor": "Imperva", "EventProduct": "Incapsula", "EventType": "SIEMintegration"} header_array = cef_raw.split('|') parsed_cef["Device Version"]=header_array[3] parsed_cef["Signature"]=header_array[4] parsed_cef["Attack Name"]=header_array[5] parsed_cef["Attack Severity"]=header_array[6] for key,val in re.findall(rx, cef_raw): if val.startswith('"') and val.endswith('"'): val = val[1:-1] parsed_cef[key]=val cs_array = ['cs1','cs2','cs3','cs4','cs5','cs6','cs7','cs8','cs9'] for elem in cs_array: try: if parsed_cef[elem] is not None: parsed_cef[(parsed_cef[f'{elem}Label']).replace( " ", "")] = parsed_cef[elem] parsed_cef.pop(f'{elem}Label') parsed_cef.pop(elem) except Exception as err: # As per the documentation availability of this field (cs6 and cs6 label) in your logs depends on your account plan. If your plan does not include Advanced Client Classification, the field name and value are not included in the logs. For more details, contact your Imperva Sales Representative. pass if 'start' in parsed_cef.keys() and parsed_cef['start'] is not None and parsed_cef['start']!="": try: timestamp = datetime.datetime.utcfromtimestamp(int(parsed_cef['start'])/1000.0).isoformat() parsed_cef['EventGeneratedTime'] = timestamp except: parsed_cef['EventGeneratedTime'] = "" else: parsed_cef['EventGeneratedTime'] = "" return parsed_cef def gen_chunks_to_object(self, object, chunksize=100): chunk = [] for index, line in enumerate(object): if (index % chunksize == 0 and index > 0): yield chunk del chunk[:] chunk.append(line) yield chunk class ProcessToSentinel: def __init__(self): self.logAnalyticsUri = logAnalyticsUri def build_signature(self, date, content_length, method, content_type, resource): x_headers = 'x-ms-date:' + date string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource bytes_to_hash = bytes(string_to_hash, encoding="utf-8") decoded_key = base64.b64decode(shared_key) encoded_hash = base64.b64encode( hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode() authorization = "SharedKey {}:{}".format(customer_id, encoded_hash) return authorization def post_data(self, body, chunk_count,file_name): method = 'POST' content_type = 'application/json' resource = '/api/logs' rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') content_length = len(body) signature = self.build_signature(rfc1123date, content_length, method, content_type, resource) uri = self.logAnalyticsUri + resource + '?api-version=2016-04-01' headers = { 'content-type': content_type, 'Authorization': signature, 'Log-Type': 'ImpervaWAFCloud', 'x-ms-date': rfc1123date, 'time-generated-field':'EventGeneratedTime' } response = requests.post(uri, data=body, headers=headers) if (response.status_code >= 200 and response.status_code <= 299): logging.info("Chunk was processed with {} events from the file: {}".format(chunk_count, file_name)) else: logging.error("Error during sending events to Azure Sentinel. Response code:{}. File name: {}.".format(response.status_code,file_name)) def main(mytimer: func.TimerRequest) -> None: if mytimer.past_due: logging.info('The timer is past due!') logging.info('Starting program') ifh = ImpervaFilesHandler() ifh.download_files()